{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "原文：<https://zhuanlan.zhihu.com/p/95840826>\n",
    "\n",
    "2019-12-13"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 前言\n",
    "\n",
    "在Redis的配置文件中，可以指定同时连接的最大客户端数目，这个数目默认为10000，但是我们又提到Redis是单线程服务客户端请求的，那么Redis是如何做到单线程服务这10000个客户端的呢？那就是上一篇文章提到的**IO多路复用技术**。如果只是给大家拽技术名词，那我的专栏就没什么存在的价值了。所以本篇我们将介绍IO复用技术以及深入的研究Redis是如何基于IO多路复用进行任务调度的。同时在文章的最后我们还会深入研究Python是如何基于IO多路复用技术进行协程调度的，你会惊讶的发现，两者的实现逻辑几乎完全一致。本篇需要较多的计算机网络的知识基础且涉及较多的源码分析。\n",
    "\n",
    "本篇主要会涉及到主要内容。\n",
    "\n",
    "1. **基础概念**：IO复用基于非阻塞IO，那么首先我们得明确什么是阻塞IO，什么是非阻塞IO\n",
    "\n",
    "2. **IO多路复用技术**：IO复用实际上是我们将文件的事件监控托管给了操作系统，那么UNIX操作系统下，主要提供了三类系统调用来帮助用户完成事件管理，分别是select, poll ,epoll, 其中epoll是Linux特有的系统调用，本节我们以性能最好的epoll为例进行介绍。\n",
    "\n",
    "3. **Redis的任务调度**： Redis要并发服务上万客户端，一定需要良好的任务调度，本节我们介绍Redis是如何借助IO多路复用技术实现高效的任务调度的。\n",
    "\n",
    "4. **Python的协程调度**：一个个python协程，实际上就是一个个任务，区别在于这些任务在被打断时能保存上下文环境，那我们在这一节会了解到Python是如何基于IO多路复用技术实现协程的调度的。\n",
    "\n",
    "# 基础概念明晰\n",
    "\n",
    "## IO模式\n",
    "\n",
    "在UNIX系统中，一切都是文件，每个文件都有一个唯一的**文件描述符fd(File Descriptor)**来代表。我们对文件的读写就是IO操作。文件是具有**可读、可写、异常**等几种状态的，只有文件处于可读状态的时候，我们才可以读到文件内容，只有文件处于可写状态的时候，我们才可以向文件写入内容。我们进行IO操作的时候会调用系统函数，这会让程序陷入到**内核态**中，是一个比较耗时的操作。如果我们以小明去水房接水类比调用read()函数进行文件读取操作，拧开水龙头发现水龙头没水，就代表此时文件不可读。如果小明在水房**一直等待**，直到水龙头有水了（文件变为可读状态）才接上水回去，这就是**阻塞IO模式**。即如果用户调用read()，直到读取上文件内容或者出错，这个函数才会有返回。但如果小明发现水龙头没水，就立马回去，**过一会再来查看**水龙头是不是有水，就是**非阻塞IO模式**。即无论文件可不可读，read函数都立马返回，如果可读就返回文件内容，如果不可读，就返回一个不可读标识符。在下文中，我们会发现**如果要使用IO多路复用，那么一定就要基于非阻塞的IO模式。**\n",
    "\n",
    "## IO多路复用\n",
    "\n",
    "在UNIX中客户端也用socket文件来代表。**读取客户端请求，就是从代表客户端的文件中读取数据，向客户端发送数据，就是向代表客户端的文件中写入数据。**如果有100个客户端连接上了服务器，Server就会创建100个文件描述符来代表客户端。如果其中一个文件描述符可读，就说明对应的客户端发起了请求，Server就可以读出请求然后进行处理，最后再写入数据发送给客户端。那么我们如何来并发的服务这连接上的100个客户端呢？\n",
    "\n",
    "第一种方式就是**轮询**这100个文件描述符，看哪个文件描述符可读，遇到第一个可读的文件描述符（假设其fd=3），就读出文件内容（客户端请求），然后处理请求，处理结束之后再把结果写入该文件。为了达到这样的目的，Server端一定要使用**非阻塞IO**，这样即使一个文件处于不可读状态，文件读取函数也能立刻返回，不然Server就会被阻塞在某一个文件的读操作从而浪费大量的时间。当服务器处理完结果，又会向fd=3的这个文件中写入数据，但如果这是该文件不可写入怎么办？第一种方案就是原地等待，一直等到可以写入的时候再写入,如下图代码所示。第二种就是把数据保存在某个地方，继续向后遍历，直到下次再遍历到fd=3的文件的时候，再检查是否可以写入，如果可以写入则直接写入，如果不可以写入，则继续上述流程。\n",
    "\n",
    "```py\n",
    "def Server():\n",
    "  while(1):\n",
    "    for client_fd in all_clients:\n",
    "        # 有请求就读取请求，并处理请求,无请求就查看下一个客户端文件\n",
    "        request = read(fd)\n",
    "        if request == UNREADABLE:\n",
    "          continue \n",
    "        result = process_request(request)\n",
    "        # 等到客户端socket文件可写入，就写入数据发送给客户端\n",
    "        while(not writeable(client_fd)):\n",
    "           pass\n",
    "        write(fd, result)\n",
    "```\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们发现在上述的方案中，Server要始终循环检查客户端文件的状态比如是否可读，是否可写以及是否异常等，即监控文件状态的工作是由Server自己完成的。为了解放Server， 操作系统提供了一组系统调用来帮助监控文件的事件，当文件变为可读，则告诉Server可读事件发生，Server即可读取文件内的请求，并处理请求。如果文件可写，就会通知可写事件发生，Server就会把对应的数据写入并发送给客户端。接下来主要介绍**Linux中特有的epoll系统调用**，至于select、poll，由于效率会比较低，目前用的不是很多，想了解可自行百度了解。\n",
    "\n",
    "## epoll\n",
    "\n",
    "epoll把用户关心的文件描述符上的事件放到一个内核中的事件表中。当用户询问的时候，epoll会告诉用户已经发生事件的事件列表，这样用户就可以直接对事件对应的文件描述符进行操作。epoll的使用主要有三个系统调用：\n",
    "```c\n",
    "#include<sys/epoll.h>\n",
    "int epoll_create(int size)\n",
    "int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event)\n",
    "int epoll_wait(int epfd,, struct epoll_event* event, int maxevents, int timeout)\n",
    "```\n",
    "\n",
    "其中epoll的事件`epoll_event`数据结构定义如下：\n",
    "\n",
    "```c\n",
    "struct epoll_event{\n",
    "    _uint32_t events; /*事件类型*/\n",
    "     epoll_data_t data;/*用户数据*/\n",
    "}\n",
    "```\n",
    "其中epoll_data_t是一个联合体，其定义如下\n",
    "\n",
    "```c\n",
    "typedef union epoll_data{\n",
    "    void* ptr;\n",
    "    int fd;/*绑定事件的文件描述符*/\n",
    "    uint32_t u32;\n",
    "    uint64 u64;\n",
    "} epoll_data_t;\n",
    "```\n",
    "\n",
    "由于epoll将用户关心的数据存到内核的一个**文件**中，所以首先得调用`epoll_create`创建一个文件，并返回其**文件描述符epfd。**\n",
    "\n",
    "```c\n",
    "int epoll_create(int size)\n",
    "```\n",
    "\n",
    "接着我们使用`epoll_ctl`向`epfd`指向的内核文件中注册我们关心的文件描述符(参数fd)以及关于这个文件描述符我们关心的事件(events)。\n",
    "```\n",
    "int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event)\n",
    "```\n",
    "\n",
    "epoll支持的事件类型如下：\n",
    "\n",
    "| 事件| 描述    |\n",
    "|:-----|:-------|\n",
    "|EPOLLIN   | 数据可读\n",
    "|EPOLLOUT  |数据可写\n",
    "|EPOLLERR  |错误\n",
    "|EPOLLNVAL | 文件描述符未打开\n",
    "\n",
    "注册好了事件之后，我们就可以调用epol_wait向操作系统询问发生的事件列表了。\n",
    "\n",
    "```c\n",
    "int epoll_wait(int epfd,, struct epoll_event* event, int maxevents, int timeout)\n",
    "```\n",
    "\n",
    "epoll_wait将发生的事件保存在event指针所指向的数组中，通过函数的返回值我们可以得到事件数组的长度。 可以通过events中的data属性取到发生了该事件的文件描述符。同时调用`epoll_wait`的时候，如果没有事件发生，我们可以指定一个等待超时时间，如果等待时间到了还没有事件发生，函数就返回，如果在等待超时时间之前就发生了事件，则立刻返回，总而言之，我们在`epoll_wait`函数调用这里停留最多timeout时间， 如果把timeout设置为-1， 则会一直等待到有事件发生为止，如果设置等待时间为0， 则epoll_wait函数会立刻返回，不会拿到任何事件，这一点性质使得`epoll_wait`有的时候被当做定时器来用。有了epoll，我们就可以很方便的并发处理多个客户端请求了。在每个循环中，我们只需要通过`epoll_wait`来获取所有发生的事件及发生了该事件的客户端文件描述符，如果发生了可读事件EPOLLIN，我们就读取用户请求，处理其请求。如果发生了可写事件，EPOLLOUT，我们就向socket文件中写入结果数据并向用户发送，而不需要像之前那样要遍历所有的文件描述符。我们可以这样写我们的服务器。\n",
    "\n",
    "```c\n",
    "int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, TIME_OUT);\n",
    "for (int i = 0; i< ret; i++){\n",
    "     int socketfd = events[i].data.fd;\n",
    "     /*sockfd就绪，直接处理*/\n",
    "}\n",
    "```\n",
    "\n",
    "可能有同学会有疑问，**用epoll的确方便了用户，因为用户可以直接处理从epoll_wait拿到的事件列表，但这样为什么比循环遍历速度快呢？主要原因有如下两点:**\n",
    "\n",
    "1. 文件状态的探测本身涉及到系统调用，我们前面提过系统调用本身是个耗时的操作，可以想象像微信这样的产品，同时服务的客户端（我们用户）达到几十亿，如果采用循环遍历文件描述符的处理方式，那耗费的时间将是巨大的\n",
    "2. 我们连接上百万上千万个客户端，但所有的客户端中处于活跃状态的客户端其实大部分时候都是少数的。还以微信举例，全球有几十亿用户，但在同一时刻，并不是所有的用户会操作微信，这样每时每刻处于活跃的客户端相比较所有用户数目来说是极其少数的。这样，我们每次遍历所有文件描述符就不值得了，因为我们可能遍历了10亿个用户，最终取出来活跃的只有几千万个，那有90%的遍历都是无意义的操作，而通过epoll我们可以只获取已经发生了的事件，即微信可以只获取了那些在某个时刻操作了微信的用户，这样大大提高了处理的效率。\n",
    "\n",
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Redis事件调度\n",
    "\n",
    "## 文件事件和时间事件\n",
    "\n",
    "IO多路复用在Redis中被重度使用，基于不同的操作系统和用户环境，Redis会使用不同的接口，比如Unix下有select，poll， epoll。Redis对三种系统调用进行了统一的接口封装，不失一般性，我们假定在下文的代码中，Redis的事件监控采用的是epoll系统调用。在Redis中，主要有两类事件：TimeEvents和FileEvents。 顾名思义，前者是时间事件，一般都是一些定时任务，比如每隔1个小时就进行一次数据持久化，后者是socket文件的IO事件，比如文件描述符是否可读了，文件描述符是否可写等。\n",
    "\n",
    "时间事件和文件事件定义分别如下：\n",
    "\n",
    "```c\n",
    "/* File event structure\n",
    " *\n",
    " * 文件事件结构\n",
    " */\n",
    "typedef struct aeFileEvent {\n",
    "\n",
    "    // 监听事件类型掩码，\n",
    "    // 值可以是 AE_READABLE 或 AE_WRITABLE ，\n",
    "    int mask;\n",
    "\n",
    "    // 读事件处理器\n",
    "    aeFileProc *rfileProc;\n",
    "\n",
    "    // 写事件处理器\n",
    "    aeFileProc *wfileProc;\n",
    "\n",
    "    // 多路复用库的私有数据\n",
    "    void *clientData;\n",
    "\n",
    "} aeFileEvent;\n",
    "/* Time event structure\n",
    " *\n",
    " * 时间事件结构\n",
    " */\n",
    "typedef struct aeTimeEvent {\n",
    "\n",
    "    // 时间事件的唯一标识符\n",
    "    long long id; /* time event identifier. */\n",
    "\n",
    "    // 事件的到达时间\n",
    "    long when_sec; /* seconds */\n",
    "    long when_ms; /* milliseconds */\n",
    "\n",
    "    // 事件处理函数\n",
    "    aeTimeProc *timeProc;\n",
    "\n",
    "    // 事件释放函数\n",
    "    aeEventFinalizerProc *finalizerProc;\n",
    "\n",
    "    // 多路复用库的私有数据\n",
    "    void *clientData;\n",
    "\n",
    "    // 指向下个时间事件结构，形成链表\n",
    "    struct aeTimeEvent *next;\n",
    "\n",
    "} aeTimeEvent;\n",
    "```\n",
    "\n",
    "对于`aeFileEvent`我们关心的是可读和可写事件，并对两种事件分别绑定两个回调函数：读事件处理器`rfileProc`和写事件处理器`wfileProc`。当读事件发生后，我们调用读事件处理器，当可写事件发生后，我们调用写事件处理器。aeTimeEvent里包括了该定时事件希望发生的时间以及事件发生后调用的回调函数`timeProc`, 其中`when_sec`和`when_ms`指的是事件发生时的时间戳，**Redis支持到ms级别的精度。**同时我们还发现相比较`aeFileEvent`，时间事件结构体中多了一个**链表指针**，指向下一个时间事件。这是为什么呢？因为我们刚才提到，文件事件即IO事件的管理实际上我们使托管给了操作系统，他会帮我们托管好。但是时间事件，操作系统并没有提供了一个类似的功能供我们使用，所以需要Redis自己去管理。而Redis采用了链表来存储管理所有的时间事件。Redis还定义了一种叫`eventloop`的数据结构，用来管理所有的事件。其数据结构如下，存储所有注册了的aeFileEvent和所有的`aeTimeEvent`, 由于Redis采用链表管理所有的时间事件，所以只需要保存时间事件的链表头即可。\n",
    "\n",
    "```c\n",
    "/* State of an event based program \n",
    " *\n",
    " * 事件处理器的状态\n",
    " */\n",
    "typedef struct aeEventLoop {\n",
    "\n",
    "    // 目前已注册的最大描述符\n",
    "    int maxfd;   /* highest file descriptor currently registered */\n",
    "\n",
    "    // 目前已追踪的最大描述符\n",
    "    int setsize; /* max number of file descriptors tracked */\n",
    "\n",
    "    // 用于生成时间事件 id\n",
    "    long long timeEventNextId;\n",
    "\n",
    "    // 最后一次执行时间事件的时间\n",
    "    time_t lastTime;     /* Used to detect system clock skew */\n",
    "\n",
    "    // 已注册的文件事件\n",
    "    aeFileEvent *events; /* Registered events */\n",
    "\n",
    "    // 已就绪的文件事件\n",
    "    aeFiredEvent *fired; /* Fired events */\n",
    "\n",
    "    // 时间事件\n",
    "    aeTimeEvent *timeEventHead;\n",
    "\n",
    "    // 事件处理器的开关\n",
    "    int stop;\n",
    "\n",
    "    // 多路复用库的私有数据\n",
    "    void *apidata; /* This is used for polling API specific data */\n",
    "\n",
    "    // 在处理事件前要执行的函数\n",
    "    aeBeforeSleepProc *beforesleep;\n",
    "\n",
    "} aeEventLoop;\n",
    "```\n",
    "## 事件调度\n",
    "\n",
    "**接下来就要进入Redis的大量源码分析了**，在大家读的时候，一定要先看下这些代码，注释都加的比较详细，所以还比较容易懂。Redis程序启动之后，首先进行服务器的初始化，然后进行一系列的其他操作，最后开始进入到`aeMain`事件循环中。服务器初始化做了哪些事情我们后面会有提及，我们先看事件循环函数。在`aeMain`中，事件循环的每一步Redis都会执行`aeProcessEvents()`函数处理注册好的各种事件，而且在进事件处理函数之前，程序会先做一些过期键检查等操作，这个部分暂时可以忽略，日后有需要会再讲。\n",
    "\n",
    "```c++\n",
    "int main(int argc, char **argv) {\n",
    "    struct timeval tv;\n",
    "\t...\n",
    "    \n",
    "    // 初始化服务器\n",
    "    initServerConfig();\n",
    "\n",
    "    ...\n",
    "    aeMain(server.el);\n",
    "\n",
    "    // 服务器关闭，停止事件循环\n",
    "    aeDeleteEventLoop(server.el);\n",
    "\n",
    "    return 0;\n",
    "}\n",
    "\n",
    "\n",
    "\n",
    "/*\n",
    " * 事件处理器的主循环\n",
    " */\n",
    "void aeMain(aeEventLoop *eventLoop) {\n",
    "\n",
    "    eventLoop->stop = 0;\n",
    "\n",
    "    while (!eventLoop->stop) {\n",
    "\n",
    "        // 在进入每个事件循环之前，Redis会进行一些操作，比如过期键的检查以及AOF文件写入等\n",
    "        if (eventLoop->beforesleep != NULL)\n",
    "            eventLoop->beforesleep(eventLoop);\n",
    "\n",
    "        // 开始处理事件\n",
    "        aeProcessEvents(eventLoop, AE_ALL_EVENTS);\n",
    "    }\n",
    "}\n",
    "```\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**进入到了事件处理函数，就真正进入到了事件调度部分，这里也是我们本次内容的重点**。`aeProcessEvent`函数的执行逻辑如下图所示，首先检查时间事件并根据结果确定`epoll_wait`中的超时参数。如果有时间事件的时间戳已经过了，比如原本该事件预计发生的时间是13:00，结果现在已经13:01了，那么就把超时参数设置为0， 即不获取文件事件，立刻处理时间事件。反之，如果发现没有注册任何时间事件，则在第二步可以把time_out设置为-1， 让`epoll_wait`一直等到有文件事件发生为止。具体我们下文的详细代码分析。\n",
    "\n",
    "![pic](../../assets/redis4.jpg)\n",
    "\n",
    "在事件调度中，Redis首先要尽可能保证的是，**如果有定时任务（时间事件）发生了，应尽快的处理时间事件**。所以首先检查所有时间事件中预计发生时间最早的任务，**即获得时间事件链表中时间戳最小的事件的指针。**\n",
    "\n",
    "```c\n",
    "aeTimeEvent *shortest = NULL;\n",
    "struct timeval tv, *tvp;\n",
    "// 获取待发生时间最早的事件\n",
    "if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))\n",
    "  shortest = aeSearchNearestTimer(eventLoop);\n",
    "```\n",
    "\n",
    "**如果`shortest`返回是个空指针，说明`eventloop`里面就没有时间事件，**因为只要有时间事件都会有返回，那我们就可以专心处理文件事件。由于我们将文件事件的管理托管给了操作系统，所以我们需要先调用`epoll_wait`函数获得所有发生的事件数组。Redis将`epoll_wait`封装为了`aeApiPoll`函数，其中`tvp`参数是一个时间戳的结构体指针，用于指定`epoll_wait`中的超时参数。由于我们没有时间事件需要处理，所以我们在这里就可以将tvp设置为`NULL`，让`epoll_wait`函数始终等下去，直到有文件事件发生。从`aeApiPoll`拿到了所有的已发生事件之后，我们就依次调用对应的处理函数。如果是个可读事件，我们就执行该事件绑定的读事件处理器，如果是可写事件，则执行绑定的写事件处理器。\n",
    "\n",
    "```c\n",
    "numevents = aeApiPoll(eventLoop, tvp);\n",
    "//处理所有已经发生的事件\n",
    "for (j = 0; j < numevents; j++) {\n",
    "    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];\n",
    "\n",
    "    int mask = eventLoop->fired[j].mask;\n",
    "    int fd = eventLoop->fired[j].fd;\n",
    "    int rfired = 0;\n",
    "\n",
    "    // 可读事件发生，执行读事件处理器\n",
    "    if (fe->mask & mask & AE_READABLE) {\n",
    "        rfired = 1;\n",
    "        fe->rfileProc(eventLoop,fd,fe->clientData,mask);\n",
    "    }\n",
    "    // 可写事件发生，执行写事件发生器\n",
    "    if (fe->mask & mask & AE_WRITABLE) {\n",
    "        if (!rfired || fe->wfileProc != fe->rfileProc)\n",
    "            fe->wfileProc(eventLoop,fd,fe->clientData,mask);\n",
    "         }\n",
    "\n",
    "        processed++;\n",
    "    }\n",
    "```\n",
    "\n",
    "**那么如果我们有时间事件会怎么样呢**，我们前面提到过，Redis会尽量保证当时间事件发生后，尽快的先处理时间事件。`shortest`返回的就是所有时间事件中会最早发生的那个。首先我们需要比较当前时间戳和`shortest`任务的时间戳，如果当前时间已经过了`shortest`所指的事件的预定时间了，我们得刻不容缓的处理时间事件了，所以把`tvp`的等待时间设置为0，这样当我们执行`aeApiPoll`时，会立即返回。\n",
    "\n",
    "```c\n",
    "// 如果时间事件存在的话\n",
    "// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间\n",
    "long now_sec, now_ms;\n",
    "// 计算距今最近的时间事件还要多久才能达到\n",
    "// 并将该时间距保存在 tv 结构中\n",
    "aeGetTime(&now_sec, &now_ms);\n",
    "tvp = &tv;\n",
    "// 比较当前时间和会最早发生的事件的时间戳\n",
    "tvp->tv_sec = shortest->when_sec - now_sec;\n",
    "// 时间差小于 0 ，说明事件已经可以执行了，将秒和毫秒设为 0 （不阻塞）\n",
    "if (tvp->tv_sec < 0) tvp->tv_sec = 0;\n",
    "if (tvp->tv_usec < 0) tvp->tv_usec = 0;\n",
    "```\n",
    "\n",
    "如果还没到`shotest`事件对应的时间戳，我们就以`shortest`的时间戳和当前的时间戳的差值作为`aeApiPoll`的等待时间参数。比如最早发生的时间预计距离目前两分钟后，那么我们就设置在等待文件事件上最多阻塞2分钟。那么有同学可能有疑惑，那如果在这两分钟内，`aeApiPoll`返回了很多文件事件，按照`aeProcess`函数的逻辑，是会先处理这些文件事件的，**那么是不是很可能这部分文件事件处理结束时早已经过了2分钟了，即已经过了时间事件预计该发生的时间了？**是有这个可能的，但是对此Redis并不做什么措施。所以**定时任务可能不会严格与预定的时间一致**。比如可能预定的时间是13:00， 但最终任务被执行的时间可能是13:01。当然了，只会晚，不会早。\n",
    "\n",
    "```c\n",
    "//如果shortest的时间戳还没到，就用时间差设置aeApiPoll的等待参数\n",
    "if (shortest->when_ms < now_ms) {\n",
    "     tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;\n",
    "     tvp->tv_sec --;\n",
    "} else {\n",
    "     tvp->tv_usec = (shortest->when_ms - now_ms)*1000;\n",
    "}\n",
    "```\n",
    "\n",
    "至此我们已经知道了`aeProcessEvent`函数的整体执行逻辑了，我们将`aeProcessEvent`的整体函数贴在下面。当Redis启动之后，Redis就会循环执行这个函数，**所以理解这个函数对理解Redis的任务调度至关重要。**\n",
    "\n",
    "```c\n",
    "int aeProcessEvents(aeEventLoop *eventLoop, int flags)\n",
    "{\n",
    "    int processed = 0, numevents;\n",
    "\n",
    "    /* Nothing to do? return ASAP */\n",
    "    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;\n",
    "\n",
    "    /* Note that we want call select() even if there are no\n",
    "     * file events to process as long as we want to process time\n",
    "     * events, in order to sleep until the next time event is ready\n",
    "     * to fire. */\n",
    "    if (eventLoop->maxfd != -1 ||\n",
    "        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {\n",
    "        int j;\n",
    "        aeTimeEvent *shortest = NULL;\n",
    "        struct timeval tv, *tvp;\n",
    "\n",
    "        // 获取最近的时间事件\n",
    "        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))\n",
    "            shortest = aeSearchNearestTimer(eventLoop);\n",
    "        if (shortest) {\n",
    "            // 如果时间事件存在的话\n",
    "            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间\n",
    "            long now_sec, now_ms;\n",
    "\n",
    "            /* Calculate the time missing for the nearest\n",
    "             * timer to fire. */\n",
    "            // 计算距今最近的时间事件还要多久才能达到\n",
    "            // 并将该时间距保存在 tv 结构中\n",
    "            aeGetTime(&now_sec, &now_ms);\n",
    "            tvp = &tv;\n",
    "            tvp->tv_sec = shortest->when_sec - now_sec;\n",
    "            if (shortest->when_ms < now_ms) {\n",
    "                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;\n",
    "                tvp->tv_sec --;\n",
    "            } else {\n",
    "                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;\n",
    "            }\n",
    "\n",
    "            // 时间差小于 0 ，说明事件已经可以执行了，将秒和毫秒设为 0 （不阻塞）\n",
    "            if (tvp->tv_sec < 0) tvp->tv_sec = 0;\n",
    "            if (tvp->tv_usec < 0) tvp->tv_usec = 0;\n",
    "        } else {\n",
    "            \n",
    "            // 执行到这一步，说明没有时间事件\n",
    "            // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞，以及阻塞的时间长度\n",
    "\n",
    "            /* If we have to check for events but need to return\n",
    "             * ASAP because of AE_DONT_WAIT we need to set the timeout\n",
    "             * to zero */\n",
    "            if (flags & AE_DONT_WAIT) {\n",
    "                // 设置文件事件不阻塞\n",
    "                tv.tv_sec = tv.tv_usec = 0;\n",
    "                tvp = &tv;\n",
    "            } else {\n",
    "                /* Otherwise we can block */\n",
    "                // 文件事件可以阻塞直到有事件到达为止\n",
    "                tvp = NULL; /* wait forever */\n",
    "            }\n",
    "        }\n",
    "\n",
    "        // 处理文件事件，阻塞时间由 tvp 决定\n",
    "        numevents = aeApiPoll(eventLoop, tvp);\n",
    "        for (j = 0; j < numevents; j++) {\n",
    "            // 从已就绪数组中获取事件\n",
    "            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];\n",
    "\n",
    "            int mask = eventLoop->fired[j].mask;\n",
    "            int fd = eventLoop->fired[j].fd;\n",
    "            int rfired = 0;\n",
    "\n",
    "           /* note the fe->mask & mask & ... code: maybe an already processed\n",
    "             * event removed an element that fired and we still didn't\n",
    "             * processed, so we check if the event is still valid. */\n",
    "            // 读事件\n",
    "            if (fe->mask & mask & AE_READABLE) {\n",
    "                // rfired 确保读/写事件只能执行其中一个\n",
    "                rfired = 1;\n",
    "                fe->rfileProc(eventLoop,fd,fe->clientData,mask);\n",
    "            }\n",
    "            // 写事件\n",
    "            if (fe->mask & mask & AE_WRITABLE) {\n",
    "                if (!rfired || fe->wfileProc != fe->rfileProc)\n",
    "                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);\n",
    "            }\n",
    "\n",
    "            processed++;\n",
    "        }\n",
    "    }\n",
    "\n",
    "    /* Check time events */\n",
    "    // 执行时间事件\n",
    "    if (flags & AE_TIME_EVENTS)\n",
    "        processed += processTimeEvents(eventLoop);\n",
    "\n",
    "    return processed; /* return the number of processed file/time events */\n",
    "}\n",
    "```\n",
    "\n",
    "如果大家有研究过Redis的这部分代码，可能会和当初的我一样有一个疑惑： **Redis一直在循环执行这个函数，不停的获得时间事件和文件事件然后进行处理，这没问题，很好理解，但是似乎没看见在哪里注册这些事件啊？获得的这些事件到底是什么时候注册的？**\n",
    "\n",
    "## 事件注册\n",
    "\n",
    "Redis是一个服务器，从头到尾，Redis都在不停的做着这么几件事：\n",
    "\n",
    "1. 当有新的客户端连接时，需要新建一个表示客户端的Client数据结构，并为客户端之分配一个socket文件\n",
    "2. 当已连接的用户发起新的请求时，读取请求并进行处理\n",
    "3. 处理完用户的请求之后，将结果写入socket文件\n",
    "\n",
    "\n",
    "对应上述三个工作，我们分别需要注册的三个文件事件是：\n",
    "\n",
    "1. 服务器端口监听socket文件可读事件，当文件可读时，说明有新客户端连接，则创建客户端\n",
    "2. 客户端socket文件可读事件，当文件可读时，说明用户发起了新的命令请求，则读取请求并处理请求\n",
    "3. 客户端socket文件可写事件，当文件可写时，说明可以向用户发送命令执行结果。\n",
    "\n",
    "接下来，Redis是如何把上述的三个事件注册到事件循环的。\n",
    "\n",
    "**服务器端口socket文件可读事件**\n",
    "\n",
    "前文提到，当Redis服务器启动的时候进行了一个`initServerConfig()`的服务器初始化操作。在这个函数中，发现了第一个文件事件注册操作。我们说过Unix中一切都是文件，在`listenToPort`这里Redis进行了TCP的端口监听，对端口绑定了socket文件。这个文件发生可读事件时说明有了新的客户端连接。**接着对所有socket文件都创建了一个可读文件事件注册到事件循环，并通过epoll注册到操作系统中。**当可读事件发生时，我们就可以在执行上文提到的`aeProcessEvent`函数的`aeApiPoll`接口时获取到该事件，并执行该事件绑定的函数。\n",
    "\n",
    "```c\n",
    "void initServer() {\n",
    "    int j;\n",
    "    ...\n",
    "    // 打开 TCP 监听端口，用于等待客户端的命令请求\n",
    "    if (server.port != 0 &&\n",
    "        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)\n",
    "        exit(1);\n",
    "     ....\n",
    "    // 为 TCP 连接关联连接应答（accept）处理器\n",
    "    // 用于接受并应答客户端的 connect() 调用\n",
    "    for (j = 0; j < server.ipfd_count; j++) {\n",
    "        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,\n",
    "            acceptTcpHandler,NULL) == AE_ERR)\n",
    "            {\n",
    "                redisPanic(\n",
    "                    \"Unrecoverable error creating server.ipfd file event.\");\n",
    "            }\n",
    "    }\n",
    "\n",
    "  \n",
    "}\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**客户端socket文件可读事件**\n",
    "\n",
    "我们发现上面的可读事件注册了一个`acceptTcpHandler`的函数，这个函数首先为客户端创建一个socket文件，返回其文件描述符cfd， 接着进入`acceptCommandHandler`函数中尝试为连接的客户端创建一个客户端结构体，即`redisClient`结构体。在创建客户端的函数中，我们有了一个新的发现：**创建客户端的时候为该客户端的socket文件创建了一个可读文件事件。**可读事件发生时，说明该客户端发送来了命令请求。这一步把该事件加入到事件循环中，并通过epoll注册进操作系统。此时当客户端发来命令时该文件的可读事件被触发，我们就可以在`aeProcessEvent`函数中通过`aeApiPoll`获取到该事件，并执行该事件绑定的函数。\n",
    "\n",
    "```c\n",
    "void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {\n",
    "    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;\n",
    "    // accept 客户端连接\n",
    "    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);\n",
    "    // 为客户端创建客户端状态（redisClient）\n",
    "    acceptCommonHandler(cfd,0);\n",
    "}\n",
    "\n",
    "static void acceptCommonHandler(int fd, int flags) {\n",
    "\n",
    "    // 创建客户端\n",
    "    redisClient *c;\n",
    "    if ((c = createClient(fd)) == NULL) {\n",
    "        // 如果创建客户端失败，则关闭fd\n",
    "        close(fd); \n",
    "        return;\n",
    "    }\n",
    "}\n",
    "\n",
    "/*\n",
    " * 创建一个新客户端\n",
    " */\n",
    "redisClient *createClient(int fd) {\n",
    "\n",
    "    // 分配空间\n",
    "    redisClient *c = zmalloc(sizeof(redisClient));\n",
    "    ...\n",
    "    if (fd != -1) {\n",
    "        ...\n",
    "        //为客户端绑定一个读取请求的函数，并加入到事件循环\n",
    "        if (aeCreateFileEvent(server.el,fd,AE_READABLE,\n",
    "            readQueryFromClient, c) == AE_ERR)\n",
    "        {\n",
    "            close(fd);\n",
    "            zfree(c);\n",
    "            return NULL;\n",
    "        }\n",
    "    }\n",
    "\n",
    "    // 初始化各个属性\n",
    "    ...\n",
    "    // 返回客户端\n",
    "    return c;\n",
    "}\n",
    "```\n",
    "\n",
    "**客户端socket文件可写事件**\n",
    "\n",
    "在创建客户端的时候，绑定了一个叫`readQueryFromClient`函数。看到名字我们就知道这个是读取客户端命令请求的函数。`readQueryFromClient`函数主要做的就是读取客户端命令，并执行命令。读取完所有的命令后，调用`processInputBuffer`函数进行命令执行处理，而在`processInputBuffer`中的命令处理实际上是在`processCommand`函数中进行。我们以服务器执行客户端发来的quit命令为例。在`processCommand`函数中，处理完该命令请求后，会调用`addReply`函数向用户发送结果，而在`addReply`首先会调用prepareClientToWrite函数。\n",
    "\n",
    "最后的最后，在`prepareClientToWrite`函数中，我们终于发现了最后一类事件的注册：客户端socket可写事件注册！我们发现在这里注册了一个可写文件事件并加入到事件循环，当该文件可写的时候，说明可以把结果发送给了客户端。这样，当该事件发生时，我们就可以在`aeProcessEvent`中调用`aeApiPoll`时获得该事件，并调用其回调函数`sendReplyToClient`把结果写入socket文件中发送给客户端。\n",
    "\n",
    "```c\n",
    "/*\n",
    " * 读取客户端的查询缓冲区内容\n",
    " */\n",
    "void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {\n",
    "    redisClient *c = (redisClient*) privdata;\n",
    "  \n",
    "    server.current_client = c;\n",
    "    ...\n",
    "    // 从查询缓存重读取内容，创建参数，并执行命令\n",
    "    // 函数会执行到缓存中的所有内容都被处理完为止\n",
    "    processInputBuffer(c);\n",
    "\n",
    "    server.current_client = NULL;\n",
    "}\n",
    "int processCommand(redisClient *c) {\n",
    "\n",
    "    // 特别处理 quit 命令\n",
    "    if (!strcasecmp(c->argv[0]->ptr,\"quit\")) {\n",
    "        addReply(c,shared.ok);\n",
    "        c->flags |= REDIS_CLOSE_AFTER_REPLY;\n",
    "        return REDIS_ERR;\n",
    "    }\n",
    "   ...\n",
    "}\n",
    "\n",
    "void addReply(redisClient *c, robj *obj) {\n",
    "\n",
    "    // 为客户端安装写处理器到事件循环\n",
    "    if (prepareClientToWrite(c) != REDIS_OK) return;\n",
    "    ...\n",
    "}\n",
    "\n",
    "int prepareClientToWrite(redisClient *c) {\n",
    "\n",
    "    ...\n",
    "    // 一般情况，为客户端套接字安装写处理器到事件循环\n",
    "    if (c->bufpos == 0 && listLength(c->reply) == 0 &&\n",
    "        (c->replstate == REDIS_REPL_NONE ||\n",
    "         c->replstate == REDIS_REPL_ONLINE) &&\n",
    "        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,\n",
    "        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;\n",
    "\n",
    "    return REDIS_OK;\n",
    "}\n",
    "```\n",
    "\n",
    "至此，我们完成了所有需要的文件事件的注册。整体流程如下：\n",
    "\n",
    "![pic](../../assets/redis5.jpg)\n",
    "\n",
    "当有新的客户端连接时，服务器端口绑定的socket文件可读事件触发，此时调用回调函数`acceptTcpHandler`创建客户端，并同时将客户端的socket文件注册一个可读事件。当已连接用户发起命令请求时，可读事件被触发，并调用其回调函数`readQueryFromClient`读取客户端命令，接着执行命令。执行命令结束后，注册被处理客户端socket文件的可写事件，当该事件被触发时，调用`sendReplyToClient`函数将结果写入到对应文件发送给客户端。同学可能会问，为啥没有提到时间事件的注册？由于篇幅有限，这里就暂时不提了。\n",
    "\n",
    "# 总结\n",
    "\n",
    "本篇我们主要介绍了IO复用技术，并着重分析了Redis中是如何基于IO复用进行事件调度。我们发现其实调度函数里的逻辑是比较简单的。整个`aeProcessEvent`函数也非常的短。关键在于弄明白这些事件是如何以及什么时候被注册进了事件循环。由于这篇博客涉及到了非常多的源码，所以可能可读性比较差,但是我已经尽可能的把原理解释清楚，我相信如果大家搞懂了我上面提到的事件调度，会对你研究Redis的源码以及搞懂Redis的运作流程有非常大的帮助。其实本来这篇还想写Python的协程实现的，因为我前两天研究了Python的协程调度实现后，发现二者惊人的一致。\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {
    "height": "calc(100% - 180px)",
    "left": "10px",
    "top": "150px",
    "width": "202.969px"
   },
   "toc_section_display": true,
   "toc_window_display": true
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
